You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/13 03:51:53 UTC

[iotdb] branch changcheng_0213 created (now 43056b51a2)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch changcheng_0213
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 43056b51a2 [IOTDB-5501] Fix memory leak in MemoryPool

This branch includes the following new commits:

     new 043dcc8fc3 change logic of cross selector
     new 43056b51a2 [IOTDB-5501] Fix memory leak in MemoryPool

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/02: [IOTDB-5501] Fix memory leak in MemoryPool

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch changcheng_0213
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 43056b51a26ac8477a70e9f51e1596d5ca8390e3
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri Feb 10 11:02:52 2023 +0800

    [IOTDB-5501] Fix memory leak in MemoryPool
---
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |  7 ++++-
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 23 ++++++++++----
 .../db/mpp/execution/exchange/SinkHandle.java      | 23 ++++++++++----
 .../db/mpp/execution/exchange/SourceHandle.java    | 25 ++++++++++++----
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  | 35 ++++++++++++++++------
 .../execution/exchange/LocalSinkHandleTest.java    | 14 +++++++--
 .../db/mpp/execution/exchange/SinkHandleTest.java  | 28 +++++++++++------
 .../mpp/execution/exchange/SourceHandleTest.java   | 10 +++++--
 8 files changed, 125 insertions(+), 40 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index b1c12bb729..793066b94a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -107,6 +107,11 @@ public class FragmentInstanceId {
   }
 
   public static String createFullId(String queryId, int fragmentId, String instanceId) {
-    return queryId + '.' + fragmentId + '.' + instanceId;
+    return queryId + "." + fragmentId + "." + instanceId;
+  }
+
+  public static String createFragmentInstanceIdFromTFragmentInstanceId(
+      TFragmentInstanceId tFragmentInstanceId) {
+    return tFragmentInstanceId.getFragmentId() + "." + tFragmentInstanceId.getInstanceId();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index ade913aebc..a40202de46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -49,6 +50,8 @@ public class SharedTsBlockQueue {
 
   private final String localPlanNodeId;
 
+  private final String fullFragmentInstanceId;
+
   private final LocalMemoryManager localMemoryManager;
 
   private boolean noMoreTsBlocks = false;
@@ -81,6 +84,8 @@ public class SharedTsBlockQueue {
       LocalMemoryManager localMemoryManager) {
     this.localFragmentInstanceId =
         Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null");
+    this.fullFragmentInstanceId =
+        FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null");
     this.localMemoryManager =
         Validate.notNull(localMemoryManager, "local memory manager cannot be null");
@@ -159,7 +164,7 @@ public class SharedTsBlockQueue {
         .getQueryPool()
         .free(
             localFragmentInstanceId.getQueryId(),
-            localFragmentInstanceId.getInstanceId(),
+            fullFragmentInstanceId,
             localPlanNodeId,
             tsBlock.getRetainedSizeInBytes());
     bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
@@ -186,7 +191,7 @@ public class SharedTsBlockQueue {
             .getQueryPool()
             .reserve(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 tsBlock.getRetainedSizeInBytes(),
                 maxBytesCanReserve);
@@ -233,11 +238,15 @@ public class SharedTsBlockQueue {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
@@ -258,11 +267,15 @@ public class SharedTsBlockQueue {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
@@ -283,7 +296,7 @@ public class SharedTsBlockQueue {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 139387d510..fbc36ed34d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
@@ -69,6 +70,8 @@ public class SinkHandle implements ISinkHandle {
 
   private final String localPlanNodeId;
   private final TFragmentInstanceId localFragmentInstanceId;
+
+  private final String fullFragmentInstanceId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
   private final TsBlockSerde serde;
@@ -122,6 +125,8 @@ public class SinkHandle implements ISinkHandle {
     this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.fullFragmentInstanceId =
+        FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
     this.serde = Validate.notNull(serde);
@@ -138,7 +143,7 @@ public class SinkHandle implements ISinkHandle {
             .getQueryPool()
             .reserve(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
                 DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) // actually we only know maxBytesCanReserve after
@@ -179,7 +184,7 @@ public class SinkHandle implements ISinkHandle {
               .getQueryPool()
               .reserve(
                   localFragmentInstanceId.getQueryId(),
-                  localFragmentInstanceId.getInstanceId(),
+                  fullFragmentInstanceId,
                   localPlanNodeId,
                   retainedSizeInBytes,
                   maxBytesCanReserve)
@@ -223,11 +228,15 @@ public class SinkHandle implements ISinkHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
     sinkHandleListener.onAborted(this);
     logger.debug("[EndAbortSinkHandle]");
   }
@@ -243,11 +252,15 @@ public class SinkHandle implements ISinkHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
     sinkHandleListener.onFinish(this);
     logger.debug("[EndCloseSinkHandle]");
   }
@@ -327,7 +340,7 @@ public class SinkHandle implements ISinkHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               freedBytes);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index d164e9d1de..386fdd40b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
@@ -68,6 +69,8 @@ public class SourceHandle implements ISourceHandle {
   private final TEndPoint remoteEndpoint;
   private final TFragmentInstanceId remoteFragmentInstanceId;
   private final TFragmentInstanceId localFragmentInstanceId;
+
+  private final String fullFragmentInstanceId;
   private final String localPlanNodeId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
@@ -123,6 +126,8 @@ public class SourceHandle implements ISourceHandle {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint);
     this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.fullFragmentInstanceId =
+        FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
@@ -172,7 +177,7 @@ public class SourceHandle implements ISourceHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               retainedSize);
 
@@ -214,7 +219,7 @@ public class SourceHandle implements ISourceHandle {
               .getQueryPool()
               .reserve(
                   localFragmentInstanceId.getQueryId(),
-                  localFragmentInstanceId.getInstanceId(),
+                  fullFragmentInstanceId,
                   localPlanNodeId,
                   bytesToReserve,
                   maxBytesCanReserve);
@@ -316,11 +321,15 @@ public class SourceHandle implements ISourceHandle {
             .getQueryPool()
             .free(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 bufferRetainedSizeInBytes);
         bufferRetainedSizeInBytes = 0;
       }
+      localMemoryManager
+          .getQueryPool()
+          .clearMemoryReservationMap(
+              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
       aborted = true;
       sourceHandleListener.onAborted(this);
     }
@@ -349,11 +358,15 @@ public class SourceHandle implements ISourceHandle {
             .getQueryPool()
             .free(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 bufferRetainedSizeInBytes);
         bufferRetainedSizeInBytes = 0;
       }
+      localMemoryManager
+          .getQueryPool()
+          .clearMemoryReservationMap(
+              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
       closed = true;
       currSequenceId = lastSequenceId + 1;
       sourceHandleListener.onFinished(this);
@@ -417,7 +430,7 @@ public class SourceHandle implements ISourceHandle {
         "Query[%s]-[%s-%s-SourceHandle-%s]",
         localFragmentInstanceId.getQueryId(),
         localFragmentInstanceId.getFragmentId(),
-        localFragmentInstanceId.getInstanceId(),
+        fullFragmentInstanceId,
         localPlanNodeId);
   }
 
@@ -527,7 +540,7 @@ public class SourceHandle implements ISourceHandle {
             .getQueryPool()
             .free(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 reservedBytes);
         sourceHandleListener.onFailure(SourceHandle.this, t);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index 190cf24eb0..6972111810 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -292,14 +292,11 @@ public class MemoryPool {
       Validate.isTrue(bytes <= queryReservedBytes);
 
       queryReservedBytes -= bytes;
-      if (queryReservedBytes == 0) {
-        queryMemoryReservations.get(queryId).get(fragmentInstanceId).remove(planNodeId);
-      } else {
-        queryMemoryReservations
-            .get(queryId)
-            .get(fragmentInstanceId)
-            .put(planNodeId, queryReservedBytes);
-      }
+      queryMemoryReservations
+          .get(queryId)
+          .get(fragmentInstanceId)
+          .put(planNodeId, queryReservedBytes);
+
       reservedBytes -= bytes;
 
       if (memoryReservationFutures.isEmpty()) {
@@ -350,7 +347,7 @@ public class MemoryPool {
         future.set(null);
       } catch (Throwable t) {
         // ignore it, because we still need to notify other future
-        LOGGER.error("error happened while trying to free memory: ", t);
+        LOGGER.warn("error happened while trying to free memory: ", t);
       }
     }
   }
@@ -369,4 +366,24 @@ public class MemoryPool {
   public long getReservedBytes() {
     return reservedBytes;
   }
+
+  public void clearMemoryReservationMap(
+      String queryId, String fragmentInstanceId, String planNodeId) {
+    if (queryMemoryReservations.get(queryId) == null
+        || queryMemoryReservations.get(queryId).get(fragmentInstanceId) == null) {
+      return;
+    }
+    Map<String, Long> planNodeIdToBytesReserved =
+        queryMemoryReservations.get(queryId).get(fragmentInstanceId);
+    if (planNodeIdToBytesReserved.get(planNodeId) == null
+        || planNodeIdToBytesReserved.get(planNodeId) <= 0) {
+      planNodeIdToBytesReserved.remove(planNodeId);
+      if (planNodeIdToBytesReserved.isEmpty()) {
+        queryMemoryReservations.get(queryId).remove(fragmentInstanceId);
+      }
+      if (queryMemoryReservations.get(queryId).isEmpty()) {
+        queryMemoryReservations.remove(queryId);
+      }
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 0a022ae06b..8bcb1b300b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.execution.exchange;
 
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -86,7 +87,8 @@ public class LocalSinkHandleTest {
     Mockito.verify(spyMemoryPool, Mockito.times(11))
         .reserve(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                remoteFragmentInstanceId),
             remotePlanNodeId,
             mockTsBlockSize,
             Long.MAX_VALUE);
@@ -102,7 +104,12 @@ public class LocalSinkHandleTest {
     Assert.assertFalse(localSinkHandle.isFinished());
     Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(spyMemoryPool, Mockito.times(11))
-        .free(queryId, localFragmentInstanceId.getInstanceId(), remotePlanNodeId, mockTsBlockSize);
+        .free(
+            queryId,
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                remoteFragmentInstanceId),
+            remotePlanNodeId,
+            mockTsBlockSize);
 
     // Set no-more-TsBlocks.
     localSinkHandle.setNoMoreTsBlocks();
@@ -169,7 +176,8 @@ public class LocalSinkHandleTest {
     Mockito.verify(spyMemoryPool, Mockito.times(11))
         .reserve(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                remoteFragmentInstanceId),
             remotePlanNodeId,
             mockTsBlockSize,
             Long.MAX_VALUE);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index 0c8ebf9f73..d50e69dd2a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -116,7 +117,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //
+    // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
@@ -164,7 +166,8 @@ public class SinkHandleTest {
     Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
         .free(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -201,7 +204,8 @@ public class SinkHandleTest {
     MemoryPool mockMemoryPool =
         Utils.createMockBlockedMemoryPool(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock,
             mockTsBlockSize);
@@ -261,7 +265,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //
+    // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
@@ -302,7 +307,8 @@ public class SinkHandleTest {
     Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
         .free(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock * mockTsBlockSize);
 
@@ -318,7 +324,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(3))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //
+    // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
@@ -378,7 +385,8 @@ public class SinkHandleTest {
     Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(2))
         .free(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -401,7 +409,8 @@ public class SinkHandleTest {
     MemoryPool mockMemoryPool =
         Utils.createMockBlockedMemoryPool(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock,
             mockTsBlockSize);
@@ -462,7 +471,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //
+    // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index 1a5ad7560f..d0b8f49f29 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -252,7 +253,8 @@ public class SourceHandleTest {
       Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6))
           .reserve(
               queryId,
-              localFragmentInstanceId.getInstanceId(),
+              FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                  localFragmentInstanceId),
               localPlanNodeId,
               MOCK_TSBLOCK_SIZE,
               maxBytesCanReserve);
@@ -283,7 +285,11 @@ public class SourceHandleTest {
     for (int i = 0; i < numOfMockTsBlock; i++) {
       Mockito.verify(spyMemoryPool, Mockito.timeout(10_0000).times(i))
           .free(
-              queryId, localFragmentInstanceId.getInstanceId(), localPlanNodeId, MOCK_TSBLOCK_SIZE);
+              queryId,
+              FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                  localFragmentInstanceId),
+              localPlanNodeId,
+              MOCK_TSBLOCK_SIZE);
       sourceHandle.receive();
       try {
         if (i < 5) {


[iotdb] 01/02: change logic of cross selector

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch changcheng_0213
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 043dcc8fc3109822f6600857ea46321a2cb6748a
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Jan 18 14:55:20 2023 +0800

    change logic of cross selector
---
 .../impl/RewriteCrossSpaceCompactionSelector.java         | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index 1a3bb4480d..8ce59d35c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.compaction.selector.utils.CrossSpaceCompaction
 import org.apache.iotdb.db.engine.compaction.selector.utils.CrossSpaceCompactionCandidate.CrossCompactionTaskResourceSplit;
 import org.apache.iotdb.db.engine.compaction.selector.utils.CrossSpaceCompactionCandidate.TsFileResourceCandidate;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.MergeException;
 import org.apache.iotdb.db.rescon.SystemInfo;
@@ -186,10 +187,20 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
       CrossCompactionTaskResource taskResource,
       TsFileResource unseqFile,
       List<TsFileResource> seqFiles,
-      long memoryCost) {
+      long memoryCost)
+      throws IOException {
+    // we add a hard limit for cross compaction that selected unseqFile should be compacted in inner
+    // space at least once. This is used to make to improve the priority of inner compaction and
+    // avoid too much cross compaction with small files.
+    TsFileNameGenerator.TsFileName unseqFileName =
+        TsFileNameGenerator.getTsFileName(unseqFile.getTsFile().getName());
+    if (unseqFileName.getInnerCompactionCnt() < 1) {
+      return false;
+    }
     // currently, we must allow at least one unseqFile be selected to handle the situation that
     // an unseqFile has huge time range but few data points.
-    if (taskResource.getUnseqFiles().isEmpty()) {
+    // IMPORTANT: this logic is opposite to previous one
+    if (taskResource.getUnseqFiles().size() == 0) {
       return true;
     }
     long totalFileSize = unseqFile.getTsFileSize();