You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/04 08:14:32 UTC

[iotdb] branch MemoryFreeBug created (now 04f518e711)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch MemoryFreeBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 04f518e711 Fix a memory free bug and delete useless O(N^2) check in DriverContext

This branch includes the following new commits:

     new 04f518e711 Fix a memory free bug and delete useless O(N^2) check in DriverContext

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Fix a memory free bug and delete useless O(N^2) check in DriverContext

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch MemoryFreeBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 04f518e71162acb85cf0e7d417f1e392214e2751
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Apr 4 16:14:18 2023 +0800

    Fix a memory free bug and delete useless O(N^2) check in DriverContext
---
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |  2 +-
 .../db/mpp/execution/driver/DriverContext.java     | 10 ----
 .../execution/exchange/MPPDataExchangeManager.java | 57 +++++++++++++---------
 3 files changed, 36 insertions(+), 33 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index 24164c4e89..1d5ca26b22 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -58,7 +58,7 @@ public class FragmentInstanceId {
   }
 
   public String getFragmentInstanceId() {
-    return fragmentId + "." + instanceId;
+    return fragmentId.getId() + "." + instanceId;
   }
 
   public String toString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 58a131545f..9b0ae75f28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -29,8 +29,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 public class DriverContext {
 
   private boolean inputDriver = true;
@@ -51,14 +49,6 @@ public class DriverContext {
 
   public OperatorContext addOperatorContext(
       int operatorId, PlanNodeId planNodeId, String operatorType) {
-    checkArgument(operatorId >= 0, "operatorId is negative");
-
-    for (OperatorContext operatorContext : operatorContexts) {
-      checkArgument(
-          operatorId != operatorContext.getOperatorId(),
-          "A context already exists for operatorId %s",
-          operatorId);
-    }
 
     OperatorContext operatorContext =
         new OperatorContext(operatorId, planNodeId, operatorType, this);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 9a58904c79..73729fe0ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -485,11 +485,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       // FragmentInstanceContext
       FragmentInstanceContext instanceContext) {
 
-    LOGGER.debug(
-        "Create local sink handle to plan node {} of {} for {}",
-        remotePlanNodeId,
-        remoteFragmentInstanceId,
-        localFragmentInstanceId);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Create local sink handle to plan node {} of {} for {}",
+          remotePlanNodeId,
+          remoteFragmentInstanceId,
+          localFragmentInstanceId);
+    }
 
     SharedTsBlockQueue queue;
     Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(remoteFragmentInstanceId);
@@ -515,7 +517,9 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
    */
   public ISinkChannel createLocalSinkChannelForPipeline(
       DriverContext driverContext, String planNodeId) {
-    LOGGER.debug("Create local sink handle for {}", driverContext.getDriverTaskID());
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Create local sink handle for {}", driverContext.getDriverTaskID());
+    }
     SharedTsBlockQueue queue =
         new SharedTsBlockQueue(
             driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
@@ -537,11 +541,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       // FragmentInstanceContext
       FragmentInstanceContext instanceContext) {
 
-    LOGGER.debug(
-        "Create sink handle to plan node {} of {} for {}",
-        remotePlanNodeId,
-        remoteFragmentInstanceId,
-        localFragmentInstanceId);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Create sink handle to plan node {} of {} for {}",
+          remotePlanNodeId,
+          remoteFragmentInstanceId,
+          localFragmentInstanceId);
+    }
 
     return new SinkChannel(
         remoteEndpoint,
@@ -623,7 +629,9 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
    */
   public ISourceHandle createLocalSourceHandleForPipeline(
       SharedTsBlockQueue queue, DriverContext context) {
-    LOGGER.debug("Create local source handle for {}", context.getDriverTaskID());
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Create local source handle for {}", context.getDriverTaskID());
+    }
     return new LocalSourceHandle(
         queue,
         new PipelineSourceHandleListenerImpl(context::failed),
@@ -647,11 +655,14 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
               + " exists.");
     }
 
-    LOGGER.debug(
-        "Create local source handle from {} for plan node {} of {}",
-        remoteFragmentInstanceId,
-        localPlanNodeId,
-        localFragmentInstanceId);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Create local source handle from {} for plan node {} of {}",
+          remoteFragmentInstanceId,
+          localPlanNodeId,
+          localFragmentInstanceId);
+    }
+
     SharedTsBlockQueue queue;
     ISinkHandle sinkHandle = shuffleSinkHandles.get(remoteFragmentInstanceId);
     if (sinkHandle != null) {
@@ -692,11 +703,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
               + " exists.");
     }
 
-    LOGGER.debug(
-        "Create source handle from {} for plan node {} of {}",
-        remoteFragmentInstanceId,
-        localPlanNodeId,
-        localFragmentInstanceId);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Create source handle from {} for plan node {} of {}",
+          remoteFragmentInstanceId,
+          localPlanNodeId,
+          localFragmentInstanceId);
+    }
 
     SourceHandle sourceHandle =
         new SourceHandle(