You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/08 07:06:26 UTC

[iotdb] 01/01: Set some runtime fields in FIConetxt to null when the FI is done

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ReduceContextSize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4bbdce7326101e23c5edc89f3edcdb69035e71f9
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Mar 8 15:03:57 2023 +0800

    Set some runtime fields in FIConetxt to null when the FI is done
---
 .../iotdb/db/mpp/execution/driver/DataDriver.java   |  6 ++++++
 .../execution/fragment/FragmentInstanceContext.java |  8 ++++++--
 .../fragment/FragmentInstanceExecution.java         |  4 ++++
 .../execution/fragment/FragmentInstanceManager.java | 21 ++++++++++-----------
 4 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index 287f034954..d7378a67b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -71,6 +71,12 @@ public class DataDriver extends Driver {
           ((DataDriverContext) driverContext).getSourceOperators();
       if (sourceOperators != null && !sourceOperators.isEmpty()) {
         QueryDataSource dataSource = initQueryDataSource();
+        if (dataSource == null) {
+          // if this driver is being initialized, meanwhile the whole FI was aborted or cancelled
+          // for some reasons, we may get null QueryDataSource here.
+          // And it's safe for us to throw this exception here in such case.
+          throw new IllegalStateException("QueryDataSource should never be null!");
+        }
         sourceOperators.forEach(
             sourceOperator -> {
               // construct QueryDataSource for source operator
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 4d6c80233a..f810bc1ddb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -55,7 +55,7 @@ public class FragmentInstanceContext extends QueryContext {
 
   private IDataRegionForQuery dataRegion;
   private Filter timeFilter;
-  List<PartialPath> sourcePaths;
+  private List<PartialPath> sourcePaths;
   // Shared by all scan operators in this fragment instance to avoid memory problem
   private QueryDataSource sharedQueryDataSource;
   /** closed tsfile used in this fragment instance */
@@ -354,7 +354,7 @@ public class FragmentInstanceContext extends QueryContext {
    * All file paths used by this fragment instance must be cleared and thus the usage reference must
    * be decreased.
    */
-  protected void releaseResource() {
+  protected synchronized void releaseResource() {
     for (TsFileResource tsFile : closedFilePaths) {
       FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
     }
@@ -363,5 +363,9 @@ public class FragmentInstanceContext extends QueryContext {
       FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, false);
     }
     unClosedFilePaths = null;
+    dataRegion = null;
+    timeFilter = null;
+    sourcePaths = null;
+    sharedQueryDataSource = null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 2415bed270..f9c17c9963 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -99,6 +99,10 @@ public class FragmentInstanceExecution {
         context.getFailureInfoList());
   }
 
+  public long getStartTime() {
+    return context.getStartTime();
+  }
+
   public FragmentInstanceStateMachine getStateMachine() {
     return stateMachine;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 4c4dd4187b..6e5390fdc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -92,12 +92,12 @@ public class FragmentInstanceManager {
     this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
 
     ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-        instanceManagementExecutor, this::removeOldInstances, 200, 200, TimeUnit.MILLISECONDS);
+        instanceManagementExecutor, this::removeOldInstances, 2000, 2000, TimeUnit.MILLISECONDS);
     ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
         instanceManagementExecutor,
         this::cancelTimeoutFlushingInstances,
-        200,
-        200,
+        2000,
+        2000,
         TimeUnit.MILLISECONDS);
 
     this.intoOperationExecutor =
@@ -297,14 +297,13 @@ public class FragmentInstanceManager {
 
   private void cancelTimeoutFlushingInstances() {
     long now = System.currentTimeMillis();
-    instanceContext.entrySet().stream()
-        .filter(
-            entry -> {
-              FragmentInstanceContext context = entry.getValue();
-              return context.getStateMachine().getState() == FragmentInstanceState.FLUSHING
-                  && (now - context.getStartTime()) > QUERY_TIMEOUT_MS;
-            })
-        .forEach(entry -> entry.getValue().failed(new TimeoutException()));
+    instanceExecution.forEach(
+        (key, execution) -> {
+          if (execution.getStateMachine().getState() == FragmentInstanceState.FLUSHING
+              && (now - execution.getStartTime()) > QUERY_TIMEOUT_MS) {
+            execution.getStateMachine().failed(new TimeoutException());
+          }
+        });
   }
 
   public ExecutorService getIntoOperationExecutor() {